package cn.nicholasld.nmqs.message.mqtt;

import cn.nicholasld.nmqs.model.MqttConnection;
import cn.nicholasld.nmqs.utils.DataUtil;
import jakarta.websocket.Session;
import lombok.SneakyThrows;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @author NicholasLD
 * @createTime 2023/3/30 00:56
 */
@Async
public class MyMQTT extends Thread implements MqttCallback {
    Logger log = LoggerFactory.getLogger(MyMQTT.class);

    private final String token;
    private final Session webSocketSession;
    private final String sendTopic;
    private final String receiveTopic;
    private final Integer qos;
    private MqttClient mqttClient;
    private MqttConnection mqttConnection = new MqttConnection(0,"1.1.1.1", "8083", "", "");

    public MyMQTT(String token, Session webSocketSession, String sendTopic, String receiveTopic, Integer qos) {
        this.token = token;
        this.webSocketSession = webSocketSession;
        this.sendTopic = sendTopic;
        this.receiveTopic = receiveTopic;
        this.qos = qos;
    }

    @SneakyThrows
    public void connect() {
        MqttConnection thisMQTT = DataUtil.getMqttConnectionByToken(token);
        if (thisMQTT == null) {
            log.error("[MQTT] 未找到 Token 为 {} 的 MQTT 连接配置", token);
            return;
        }

        mqttConnection = thisMQTT;

        // 防止重复创建 MQTTClient 实例
        if (mqttClient == null) {
            createMqttClient();
        }

        MqttConnectOptions options = createMqttConnectOptions();

        if (!mqttClient.isConnected()) {
            handleConnection(options);
        } else {
            handleReconnection(options);
        }
    }

    private void createMqttClient() {
        try {
            String protocol = switch (mqttConnection.getProtocol()) {
                case 0 -> "ws";
                case 1 -> "tcp";
                case 2 -> "wss";
                default -> {
                    log.info("协议类型错误");
                    yield "";
                }
            };

            String serverUrl;
            if ("ws".equals(protocol) || "wss".equals(protocol)) {
                serverUrl = String.format("%s://%s:%s/mqtt", protocol, mqttConnection.getServer(), mqttConnection.getPort());
            } else {
                serverUrl = String.format("%s://%s:%s", protocol, mqttConnection.getServer(), mqttConnection.getPort());
            }
            mqttClient = new MqttClient(serverUrl, UUID.randomUUID().toString(), new MemoryPersistence());
            mqttClient.setCallback(this);
        } catch (MqttException e) {
            log.error("[MQTT] 创建 MQTTClient 实例失败", e);
        }
    }

    private MqttConnectOptions createMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();

        if (isNotEmpty(mqttConnection.getUsername()) && isNotEmpty(mqttConnection.getPassword())) {
            options.setUserName(mqttConnection.getUsername());
            options.setPassword(mqttConnection.getPassword().toCharArray());
        }

        return options;
    }

    private void handleConnection(MqttConnectOptions options) {
        try {
            mqttClient.connect(options);
            sendWebSocketMessage("[Websocket] 远程服务器连接成功");
            log.info("[MQTT] 连接成功！");
            subscribeToTopic();
        } catch (MqttException e) {
            handleConnectionError(e);
        } catch (IOException e) {
            log.error("[MQTT] WebSocket返回给前端信息失败");
        }
    }

    private void handleReconnection(MqttConnectOptions options) {
        try {
            mqttClient.unsubscribe(receiveTopic);
            mqttClient.disconnect();
            handleConnection(options);
        } catch (MqttException e) {
            log.error("[MQTT] 断开连接失败", e);
        }
    }

    private void subscribeToTopic() throws MqttException {
        mqttClient.subscribe(receiveTopic, qos);
    }

    private void handleConnectionError(MqttException e) {
        log.error("[MQTT] 连接失败，可能是服务器连接信息有误", e);
        try {
            if (webSocketSession.isOpen()) {
                sendWebSocketMessage("[Websocket] MQTT连接失败，可能是服务器连接信息有误");
            }
        } catch (IOException ex) {
            log.error("[MQTT] WebSocket返回给前端信息失败");
        } finally {
            closeMqttClient();
        }
    }

    @SneakyThrows
    public void start() {
        log.info("MQTT连接数据初始化成功，开始连接....");
        connect();
    }

    public void publish(String topic, String content) throws MqttException {
        MqttMessage message = new MqttMessage(content.getBytes());
        try {
            mqttClient.publish(topic, message);
        } catch (Exception e) {
            log.error("[MQTT] 发送失败", e);
        }
    }

    public void publish(String content) {
        MqttMessage message = new MqttMessage(content.getBytes());
        try {
            mqttClient.publish(sendTopic, message);
        } catch (Exception e) {
            log.error("[MQTT] 发送失败", e);
        }
    }

    @SneakyThrows
    public void reConnect() {
        if (mqttClient != null) {
            MqttConnectOptions options = new MqttConnectOptions();
            mqttClient.connect(options);
            log.info("[MQTT] 重连成功！");
        }
    }

    @Override
    public void connectionLost(Throwable throwable) {
        //判断是否因为网络原因导致的连接断开
        if (throwable instanceof MqttException mqttException) {
            if (mqttException.getReasonCode() == MqttException.REASON_CODE_CONNECTION_LOST) {
                log.error("[MQTT] 连接断开，系统尝试重连中...", throwable);
                reConnect();
            }
        }

        log.info("[MQTT] 用户断开连接", throwable);
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        log.info("[MQTT] 收到消息：{}", mqttMessage.toString());

        // 将消息发送到前端，但在发送前检查 WebSocket 是否仍然打开
        if (webSocketSession.isOpen()) {
            try {
                sendWebSocketMessage(mqttMessage.toString());
            } catch (IOException e) {
                log.error("[MQTT] 发送消息到前端失败", e);
            }
        } else {
            log.warn("[MQTT] WebSocket 会话已关闭， 消息将不会被送出: {}", mqttMessage);
        }
    }


    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("[MQTT] 发送消息完成");
    }

    @Override
    public void run() {
        start();
    }

    public void release() {
        closeMqttClient();
        this.interrupt();
    }

    private void closeMqttClient() {
        if (mqttClient != null && mqttClient.isConnected()) {
            try {
                mqttClient.unsubscribe(receiveTopic);
                mqttClient.disconnect();
            } catch (MqttException e) {
                log.error("[MQTT] 断开连接失败", e);
            } finally {
                try {
                    mqttClient.close();
                } catch (MqttException e) {
                    log.error("[MQTT] 关闭失败", e);
                }
            }
        }
    }

    private void sendWebSocketMessage(String message) throws IOException {
        if (webSocketSession.isOpen()) {
            webSocketSession.getBasicRemote().sendText(message);
        } else {
            log.warn("[MQTT] WebSocket 会话已关闭， 消息将不会被送出: {}", message);
        }
    }

    private boolean isNotEmpty(String str) {
        return str != null && !str.isEmpty();
    }
}
